package com.hg.normal.consumer;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.io.UnsupportedEncodingException;
import java.util.List;

/**
 * created by skh on 2019/12/14
 */
public class Consumer {

	public static void main(String[] args) throws Exception {

		//创建一个消息消费者,并设置一个消息消费者组
		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group_java");
		//指定NameServer地址
		consumer.setNamesrvAddr("47.98.129.125:9876");
		//设置consumer第一次启动时是从队列头部还是队列尾部开始消费的
		consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
		//订阅指定 Topic 下的所有消息
		consumer.subscribe("topic_example_java", "*");

		//注册消息监听器
		consumer.registerMessageListener(new MessageListenerConcurrently() {
			@Override
			public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
				//默认list里只有一条消息,可以通过设置参数来批量接收消息
				System.out.println("消息数量:" + msgs.size());
				for (MessageExt msg : msgs) {
					try {
						System.out.println("收到消息:" + new String(msg.getBody(), "UTF-8") + ",消息id:" + msg.getMsgId());
					} catch (UnsupportedEncodingException e) {
						e.printStackTrace();
					}
				}
				return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
			}
		});

		//消费者对象在使用之前必须要调用start方法初始化
		consumer.start();
		System.out.println("消费者启动");
	}
}
